package freedom.needle.task.impl;

import java.util.Iterator;
import java.util.List;

import freedom.needle.dao.TradeXmlTempDao;
import freedom.needle.entity.TradeXmlTemp;
import freedom.needle.enums.TradeXmlStatusEnum;
import freedom.needle.mod.StaticModStrategy;
import freedom.needle.queue.BlockingDistinctQueue;
import freedom.needle.tps.TpsWindow;
import freedom.needle.util.WeblogicUtil;

public class TradeXmlTempProducer extends AbstractCycleTask {
	
	private int batchSize;
	private BlockingDistinctQueue<TradeXmlTemp> queue;
	private TradeXmlTempDao tradeXmlTempDao;
	private TpsWindow tpsWindow;
	private static final String info = "[threadId:%d, nodes:%d, modValue:%d]: selected %d from db, puted %d into queue, sleeping %d millis...";
	private static final String sql = "select * from (select * from trade_xml_temp where status = ? limit ? ) as a where mod(id,?) = ?";
	
	@Override
	public Class<?> getClazz() {
		return TradeXmlTempProducer.class;
	}

	@Override
	public void execute() throws Exception {
		if(queue.notFull()) {
			int nodes = WeblogicUtil.getNodes();
			int modValue = StaticModStrategy.getModValue();
			Object[] params = new Object[]{TradeXmlStatusEnum.STEP_1_READY.name(), batchSize, nodes, modValue};
			List<TradeXmlTemp> list = tradeXmlTempDao.selectBySql(sql, params);
			if(!list.isEmpty()) {
				int count = 0;
				for (Iterator<TradeXmlTemp> iterator = list.iterator(); iterator.hasNext();) {
					TradeXmlTemp tradeXmlTemp =  iterator.next();
					if(queue.put(tradeXmlTemp)) {
						count++;
						if(tpsWindow.isOpen()) {
							tpsWindow.inc();
						}
					}
				}
				
				if(count > 0 && list.size() == count)
					sleepStrategy.decrease();
				else
					sleepStrategy.increase();
					
				if(logger.isInfoEnabled()) {
					long threadId = Thread.currentThread().getId();
					logger.info(String.format(info, threadId, nodes, modValue, list.size(), count, this.sleepStrategy.getTime()));
				}
			} else {
				sleepStrategy.increase();
			}
		}
	}
	
	/**
	 * @param batchSize the batchSize to set
	 */
	public void setBatchSize(int batchSize) {
		this.batchSize = batchSize;
	}

	/**
	 * @param queue the queue to set
	 */
	public void setQueue(BlockingDistinctQueue<TradeXmlTemp> queue) {
		this.queue = queue;
	}

	/**
	 * @param tradeXmlTempDao the tradeXmlTempDao to set
	 */
	public void setTradeXmlTempDao(TradeXmlTempDao tradeXmlTempDao) {
		this.tradeXmlTempDao = tradeXmlTempDao;
	}

	/**
	 * @param tpsWindow the tpsWindow to set
	 */
	public void setTpsWindow(TpsWindow tpsWindow) {
		this.tpsWindow = tpsWindow;
	}
	
}
